{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "3307b886",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "WARNING: An illegal reflective access operation has occurred\n",
      "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
      "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
      "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
      "WARNING: All illegal access operations will be denied in a future release\n",
      "22/02/17 22:43:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
     ]
    }
   ],
   "source": [
    "import pyspark\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession.builder \\\n",
    "    .master(\"local[*]\") \\\n",
    "    .appName('test') \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "1ee1eb1d",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df_green = spark.read.parquet('data/pq/green/*/*')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0ca5ee99",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "649bb4da",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_green = df_green \\\n",
    "    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \\\n",
    "    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "90cd6845",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_yellow = spark.read.parquet('data/pq/yellow/*/*')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "88822efd",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_yellow = df_yellow \\\n",
    "    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \\\n",
    "    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "610167a2",
   "metadata": {},
   "outputs": [],
   "source": [
    "common_colums = []\n",
    "\n",
    "yellow_columns = set(df_yellow.columns)\n",
    "\n",
    "for col in df_green.columns:\n",
    "    if col in yellow_columns:\n",
    "        common_colums.append(col)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "839d773f",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import functions as F"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "id": "2498810a",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_green_sel = df_green \\\n",
    "    .select(common_colums) \\\n",
    "    .withColumn('service_type', F.lit('green'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "id": "19032efc",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_yellow_sel = df_yellow \\\n",
    "    .select(common_colums) \\\n",
    "    .withColumn('service_type', F.lit('yellow'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "id": "f5b0f3d1",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_trips_data = df_green_sel.unionAll(df_yellow_sel)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "id": "1bed8b33",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+--------+\n",
      "|service_type|   count|\n",
      "+------------+--------+\n",
      "|       green| 2304517|\n",
      "|      yellow|39649199|\n",
      "+------------+--------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_trips_data.groupBy('service_type').count().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "id": "28cc8fa3",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['VendorID',\n",
       " 'pickup_datetime',\n",
       " 'dropoff_datetime',\n",
       " 'store_and_fwd_flag',\n",
       " 'RatecodeID',\n",
       " 'PULocationID',\n",
       " 'DOLocationID',\n",
       " 'passenger_count',\n",
       " 'trip_distance',\n",
       " 'fare_amount',\n",
       " 'extra',\n",
       " 'mta_tax',\n",
       " 'tip_amount',\n",
       " 'tolls_amount',\n",
       " 'improvement_surcharge',\n",
       " 'total_amount',\n",
       " 'payment_type',\n",
       " 'congestion_surcharge',\n",
       " 'service_type']"
      ]
     },
     "execution_count": 40,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_trips_data.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "id": "36e90cbc",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_trips_data.registerTempTable('trips_data')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "id": "d0e01bf1",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+--------+\n",
      "|service_type|count(1)|\n",
      "+------------+--------+\n",
      "|       green| 2304517|\n",
      "|      yellow|39649199|\n",
      "+------------+--------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark.sql(\"\"\"\n",
    "SELECT\n",
    "    service_type,\n",
    "    count(1)\n",
    "FROM\n",
    "    trips_data\n",
    "GROUP BY \n",
    "    service_type\n",
    "\"\"\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b2ee7038",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_result = spark.sql(\"\"\"\n",
    "SELECT \n",
    "    -- Revenue grouping \n",
    "    PULocationID AS revenue_zone,\n",
    "    date_trunc('month', pickup_datetime) AS revenue_month, \n",
    "    service_type, \n",
    "\n",
    "    -- Revenue calculation \n",
    "    SUM(fare_amount) AS revenue_monthly_fare,\n",
    "    SUM(extra) AS revenue_monthly_extra,\n",
    "    SUM(mta_tax) AS revenue_monthly_mta_tax,\n",
    "    SUM(tip_amount) AS revenue_monthly_tip_amount,\n",
    "    SUM(tolls_amount) AS revenue_monthly_tolls_amount,\n",
    "    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,\n",
    "    SUM(total_amount) AS revenue_monthly_total_amount,\n",
    "    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,\n",
    "\n",
    "    -- Additional calculations\n",
    "    AVG(passenger_count) AS avg_monthly_passenger_count,\n",
    "    AVG(trip_distance) AS avg_monthly_trip_distance\n",
    "FROM\n",
    "    trips_data\n",
    "GROUP BY\n",
    "    1, 2, 3\n",
    "\"\"\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "id": "f67eeb92",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f56a885d",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
